package drds.plus.repository.berkeley.spi;

import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.LockTimeoutException;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.rep.ReplicaWriteException;
import drds.plus.executor.cursor.cursor.Cursor;
import drds.plus.executor.cursor.cursor.impl.DuplicateValueRowDataLinkedList;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaData;
import drds.plus.executor.record_codec.RecordCodec;
import drds.plus.executor.record_codec.RecordCodecFactory;
import drds.plus.executor.record_codec.record.KeyValueRecordPair;
import drds.plus.executor.record_codec.record.Record;
import drds.plus.executor.row_values.RowValues;
import drds.plus.executor.utils.Utils;
import drds.plus.sql_process.abstract_syntax_tree.configuration.ColumnMetaData;
import drds.plus.sql_process.abstract_syntax_tree.configuration.IndexMapping;
import drds.plus.util.GeneralUtil;

import java.util.*;

public class CursorImpl implements Cursor {

    com.sleepycat.je.Cursor cursor;
    DatabaseEntry keyEntry = new DatabaseEntry();
    DatabaseEntry valueEntry = new DatabaseEntry();
    LockMode lockMode;
    IndexMapping indexMapping;
    KeyValueRecordPair keyValueRecordPair = new KeyValueRecordPair();
    RecordCodec keyRecordCodec;
    RecordCodec valueRecordCodec;
    Record keyRecord;
    Record valueRecord;

    CursorMetaData cursorMetaData = null;
    DatabaseEntry emptyValueEntry = new DatabaseEntry();
    List<ColumnMetaData> columnMetaDataList = null;
    private boolean isBeforeFirst = false;

    {
        emptyValueEntry.setData(new byte[1]);
    }

    public CursorImpl(IndexMapping indexMapping, com.sleepycat.je.Cursor cursor, LockMode lockMode) {
        this.indexMapping = indexMapping;
        this.cursorMetaData = Utils.convertIndexMetaDataToCursorMetaData(indexMapping);
        this.cursor = cursor;
        this.lockMode = lockMode;
        this.keyRecordCodec = RecordCodecFactory.newRecordCodec(indexMapping.getKeyColumnMetaDataList());
        if (indexMapping.isPrimaryKeyIndex()) {
            this.valueRecordCodec = RecordCodecFactory.newRecordCodec(indexMapping.getValueColumnMetaDataList());
        } else {
            this.valueRecordCodec = RecordCodecFactory.newRecordCodec(indexMapping.getValueColumnMetaDataList());
        }
        this.keyRecord = keyRecordCodec.newRecord();
        this.valueRecord = valueRecordCodec.newRecord();
    }


    public static RowValues convertKeyValuePairToKeyValuePairRowData(CursorMetaData cursorMetaData, KeyValueRecordPair keyValueRecordPair) {
        if (keyValueRecordPair == null) {
            return null;
        }
        return new KeyValuePairRowValues(cursorMetaData, keyValueRecordPair);
    }

    private void setKeyValue() {
        keyValueRecordPair = new KeyValueRecordPair();
        keyValueRecordPair.setKey(keyRecordCodec.decode(keyEntry.getData()));
        if (valueEntry.getSize() == 0) {
            keyValueRecordPair.setValue(null);
        } else {
            keyValueRecordPair.setValue(valueRecordCodec.decode(valueEntry.getData()));
        }
    }


    public boolean skipTo(Record keyRecord) {
        keyEntry.setData(keyRecordCodec.encode(keyRecord));
        if (OperationStatus.SUCCESS == cursor.getSearchKeyRange(keyEntry, valueEntry, lockMode)) {
            setKeyValue();
            return true;
        } else {
            return false;
        }

    }

    public boolean skipToNoRange(Record keyRecord) {
        keyEntry.setData(keyRecordCodec.encode(keyRecord));
        OperationStatus operationStatus = cursor.getSearchKey(keyEntry, valueEntry, lockMode);
        if (OperationStatus.SUCCESS == operationStatus) {
            setKeyValue();
            return true;
        } else {
            return false;
        }
    }

    public List<RuntimeException> close(List<RuntimeException> runtimeExceptionList) {
        if (runtimeExceptionList == null) {
            runtimeExceptionList = new ArrayList<RuntimeException>();
        }
        try {
            cursor.close();
        } catch (Exception e) {
            runtimeExceptionList.add(new RuntimeException(e));
        }
        return runtimeExceptionList;
    }


    public RowValues first() throws RuntimeException {
        OperationStatus operationStatus = cursor.getFirst(keyEntry, valueEntry, lockMode);
        if (OperationStatus.SUCCESS == operationStatus) {
            setKeyValue();
            return convertKeyValuePairToKeyValuePairRowData(cursorMetaData, keyValueRecordPair);
        } else {
            return null;
        }
    }

    public RowValues prev() throws RuntimeException {
        OperationStatus operationStatus = cursor.getPrev(keyEntry, valueEntry, lockMode);
        if (OperationStatus.SUCCESS == operationStatus) {
            setKeyValue();
            return convertKeyValuePairToKeyValuePairRowData(cursorMetaData, keyValueRecordPair);
        } else {
            return null;
        }
    }


    public RowValues current() throws RuntimeException {
        if (keyValueRecordPair == null || keyValueRecordPair.getKey() == null) {
            OperationStatus operationStatus = cursor.getNext(keyEntry, valueEntry, lockMode);
            if (OperationStatus.SUCCESS == operationStatus) {
                setKeyValue();
                return convertKeyValuePairToKeyValuePairRowData(cursorMetaData, keyValueRecordPair);
            } else {
                return null;
            }
        } else {
            return convertKeyValuePairToKeyValuePairRowData(cursorMetaData, keyValueRecordPair);
        }
    }

    public RowValues next() {
        if (isBeforeFirst) {
            isBeforeFirst = false;
            return convertKeyValuePairToKeyValuePairRowData(cursorMetaData, keyValueRecordPair);
        }
        //
        OperationStatus operationStatus = cursor.getNext(keyEntry, valueEntry, lockMode);
        if (OperationStatus.SUCCESS == operationStatus) {
            // 任何情况都应该返回新的KVPair，要不然会覆盖上一次结果
            setKeyValue();
        } else {
            keyValueRecordPair = null;
        }
        return convertKeyValuePairToKeyValuePairRowData(cursorMetaData, keyValueRecordPair);
    }

    public RowValues last() throws RuntimeException {
        OperationStatus operationStatus = cursor.getLast(keyEntry, valueEntry, lockMode);
        if (OperationStatus.SUCCESS == operationStatus) {
            setKeyValue();
            return convertKeyValuePairToKeyValuePairRowData(cursorMetaData, keyValueRecordPair);
        } else {
            return null;
        }
    }

    public boolean skipTo(KeyValueRecordPair keyKeyValueRecordPair) throws RuntimeException {
        keyEntry.setData(keyRecordCodec.encode(keyKeyValueRecordPair.getKey()));
        if (valueEntry != null) {
            valueEntry.setData(valueRecordCodec.encode(keyKeyValueRecordPair.getValue()));
        }
        if (OperationStatus.SUCCESS == cursor.getSearchBothRange(keyEntry, valueEntry, lockMode)) {
            setKeyValue();
            return true;
        } else {
            return false;
        }
    }

    public RowValues getNextDuplicateValueRowData() throws RuntimeException {
        OperationStatus operationStatus = cursor.getNextDup(keyEntry, valueEntry, lockMode);
        if (OperationStatus.SUCCESS == operationStatus) {
            setKeyValue();
        } else {
            keyValueRecordPair = null;
        }
        return convertKeyValuePairToKeyValuePairRowData(cursorMetaData, keyValueRecordPair);
    }

    public void put(Record key, Record value) throws RuntimeException {
        DatabaseEntry keyEntry = new DatabaseEntry();
        DatabaseEntry valueEntry = new DatabaseEntry();
        keyEntry.setData(keyRecordCodec.encode(key));
        if (value != null) {
            valueEntry.setData(valueRecordCodec.encode(value));
        } else {
            valueEntry = emptyValueEntry;
        }
        try {
            cursor.put(keyEntry, valueEntry);
        } catch (LockTimeoutException ex) {
            LockTimeoutException ex1 = ex;
            while (ex1 != null) {
                try {
                    cursor.put(keyEntry, valueEntry);
                    ex1 = null;
                } catch (LockTimeoutException ex2) {
                    ex1 = ex2;
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        } catch (ReplicaWriteException ex) {
            throw new RuntimeException(ex);
        }
    }

    public boolean delete() throws RuntimeException {
        return OperationStatus.SUCCESS == cursor.delete();
    }


    public Map<Record, DuplicateValueRowDataLinkedList> mgetRecordToDuplicateValueRowDataLinkedListMap(List<Record> keyRecordList, boolean prefixMatch, boolean keyFilterOrValueFilter) throws RuntimeException {
        if (keyFilterOrValueFilter == false) {
            throw new UnsupportedOperationException("BDB的valueFilter的mget还未实现");
        }

        Map<Record, DuplicateValueRowDataLinkedList> recordToDuplicateKeyValuePairHashMap = new HashMap<Record, DuplicateValueRowDataLinkedList>(64);
        if (keyRecordList == null) {
            keyRecordList = Collections.emptyList();
        }
        for (Record record : keyRecordList) {
            if (prefixMatch) {
                throw new UnsupportedOperationException("not supported yet ");
            } else {
                nonPrefixMatch(recordToDuplicateKeyValuePairHashMap, record);
            }
        }

        return recordToDuplicateKeyValuePairHashMap;
    }

    private void nonPrefixMatch(Map<Record, DuplicateValueRowDataLinkedList> recordToDuplicateKeyValuePairHashMap, Record record) throws RuntimeException {
        // 跳转到这个cr的第一个
        boolean has = this.skipToNoRange(record);
        if (has) {
            DuplicateValueRowDataLinkedList duplicateValueRowDataLinkedList = buildDuplicateValueLinkedList();
            recordToDuplicateKeyValuePairHashMap.put(record, duplicateValueRowDataLinkedList);
        }
    }

    public List<DuplicateValueRowDataLinkedList> mgetDuplicateValueRowDataLinkedListList(List<Record> keyRecordList, boolean prefixMatch, boolean keyFilterOrValueFilter) throws RuntimeException {

        if (keyFilterOrValueFilter == false) {
            throw new UnsupportedOperationException("BDB的valueFilter的mget还未实现");
        }
        List<DuplicateValueRowDataLinkedList> duplicateValueRowDataLinkedList = new ArrayList<DuplicateValueRowDataLinkedList>(64);
        if (keyRecordList == null) {
            keyRecordList = Collections.emptyList();
        }
        for (Record record : keyRecordList) {
            if (prefixMatch) {
                throw new UnsupportedOperationException("not supported yet ");
            } else {
                nonPrefixMatchList(duplicateValueRowDataLinkedList, record);
            }
        }
        return duplicateValueRowDataLinkedList;
    }

    private void nonPrefixMatchList(List<DuplicateValueRowDataLinkedList> duplicateValueRowDataLinkedList, Record record) throws RuntimeException {
        // 跳转到这个cr的第一个
        boolean has = this.skipToNoRange(record);
        if (has) {
            DuplicateValueRowDataLinkedList duplicateValueRowDataLinkedList1 = buildDuplicateValueLinkedList();
            duplicateValueRowDataLinkedList.add(duplicateValueRowDataLinkedList1);
        }
    }

    private DuplicateValueRowDataLinkedList buildDuplicateValueLinkedList() throws RuntimeException {
        RowValues current = this.current();
        DuplicateValueRowDataLinkedList duplicateValueRowDataLinkedList = new DuplicateValueRowDataLinkedList(current);
        while ((current = getNextDuplicateValueRowData()) != null) {// 添加这这个cr的重复数据列
            duplicateValueRowDataLinkedList.next = new DuplicateValueRowDataLinkedList(current);
            duplicateValueRowDataLinkedList = duplicateValueRowDataLinkedList.next;
        }
        return duplicateValueRowDataLinkedList;
    }

    public String toStringWithInden(int inden) {
        StringBuilder sb = new StringBuilder();
        String tab = GeneralUtil.getTab(inden);
        sb.append(tab).append("【Je cursor : ").append("\n");
        if (indexMapping != null) {
            sb.append(indexMapping.toStringWithInden(inden + 1));
        }
        sb.append("\n");
        return sb.toString();
    }

    public String toString() {
        return toStringWithInden(0);
    }

    public void beforeFirst() throws RuntimeException {
        this.isBeforeFirst = true;
        // 跳到第一个之前
        this.first();
    }

    public CursorMetaData getCursorMetaData() {
        return cursorMetaData;
    }

    public void setCursorMetaData(CursorMetaData cursorMetaData) {
        this.cursorMetaData = cursorMetaData;
    }

    public List<ColumnMetaData> getColumnMetaDataList() throws RuntimeException {
        if (columnMetaDataList == null) {
            columnMetaDataList = new ArrayList(indexMapping.getKeyColumnMetaDataList().size() + indexMapping.getValueColumnMetaDataList().size());
            columnMetaDataList.addAll(indexMapping.getKeyColumnMetaDataList());
            columnMetaDataList.addAll(indexMapping.getValueColumnMetaDataList());
        }

        return columnMetaDataList;
    }

    public boolean isDone() {
        return true;
    }

}
